nacos源码分析

您所在的位置:网站首页 nacos 注册服务 nacos源码分析

nacos源码分析

2024-01-05 04:11| 来源: 网络整理| 查看: 265

安装Nacos源码

上一篇文章我们了解了《Nacos服务注册》客户端源码,本篇文章我们来看一下服务注册Nacos服务端的源码执行情况。首先需要下载Nacos源码, https://github.com/alibaba/nacos/releases/tag/1.4.3 ,在这里插入图片描述解压之后使用IDEA工具导入即可。

在这里插入图片描述但是编译过后发现代码会报错,主要是缺少实体类,比如:在这里插入图片描述

安装protobuf

这主要是应该nacos数据通信底层使用到protobuf进行序列化(与JSON类似),是Google提供的一种数据序列化协议

Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,很适合做数据存储或 RPC 数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。

所以这里我们需要安装protobuf ,先去下载 https://github.com/protocolbuffers/protobuf/releases,下载window版本如下:在这里插入图片描述

下载之后解压

在这里插入图片描述

然后需要配置环境变量

在这里插入图片描述

找到consistency模块,进入src/main

在这里插入图片描述

进入main目录,执行cmd命令protoc --java_out=./java ./proto/consistency.proto protoc --java_out=./java ./proto/Data.proto 效果如下:在这里插入图片描述启动Nacos

找到console控制台,启动Nacos,第一次启动会报错,因为默认是以集群方式启动,会出现jdbc.properties找不到的错误在这里插入图片描述

然后指定为单机启动,指定VM参数

在这里插入图片描述

启动成功

在这里插入图片描述

访问 http://localhost:8848/nacos/index.html 进入控制台

在这里插入图片描述

到这里,nacos服务端的源码就启动成功了,那么我们尝试启动nacos-client程序,让他注册到nacos-server

在这里插入图片描述

查看控制台,nacos-client成功注册到服务端

在这里插入图片描述

服务注册

在上一章节《Nacos源码分析-服务注册(客户端)》我们有分析到,nacos-client提交注册的地址是post /nacos/v1/ns/instance,那么我们在nacos-server源码中找到该接口,它位于 naming 模块中的/controllers包下的InstanceController接口中。源码如下

@RestController@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT +"/instance")publicclassInstanceController{ @Autowiredprivate SwitchDomain switchDomain;@Autowiredprivate PushService pushService;@Autowiredprivate ServiceManager serviceManager;...省略.../** 注册一个新的实例 * Register new instance. * * @param request http request * @return 'ok' if success * @throws Exception any error during register */@CanDistro@PostMapping@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)//request请求对象中包括了注册的服务的port,namespaceId,groupName,serviceName,ip,集群名等等public String register(HttpServletRequest request)throws Exception { //拿到注册的服务的:namespaceId,默认是publicfinal String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);//拿到注册的服务的:serviceName服务名会把组名加在前面,比如:DEFAULT_GROUP@@nacos-clientfinal String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);//检查服务名的格式:groupName@@serviceName NamingUtils.checkServiceNameFormat(serviceName);//解析请求参数,封装服务实例对戏,把注册的服务封装为Instance,其中包括IP,端口,服务名等final Instance instance =parseInstance(request);//使用ServiceManger注册服务实例 serviceManager.registerInstance(namespaceId, serviceName, instance);return"ok";}//解析要注册的服务实例private Instance parseInstance(HttpServletRequest request)throws Exception { //拿到服务名 DEFAULT_GROUP@@nacos-client String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);//拿到app,没配置就是:unknown String app = WebUtils.optional(request,"app","DEFAULT");//拿到注册服务的:IP,是否开启服务,权重,健康状况,等封装为Instance 对象 Instance instance =getIpAddress(request); instance.setApp(app); instance.setServiceName(serviceName);// Generate simple instance id first. This value would be updated according to// 生成实例的ID:192.168.174.1#8080#DEFAULT#DEFAULT_GROUP@@nacos-client instance.setInstanceId(instance.generateInstanceId());//设置最后的心跳时间为当前时间 instance.setLastBeat(System.currentTimeMillis()); String metadata = WebUtils.optional(request,"metadata", StringUtils.EMPTY);if(StringUtils.isNotEmpty(metadata)){ instance.setMetadata(UtilsAndCommons.parseMetadata(metadata));}//验证实例 instance.validate();return instance;}

register方法中会从请求对象中拿到注册的参数比如IP,是否开启服务,权重,健康状况等,然后封装为 instance对象,交给 serviceManager.registerInstance 去注册,下面是 serviceManager.registerInstance的源码

缓存和初始化serivce@ComponentpublicclassServiceManagerimplementsRecordListener{ /** * Map(namespace, Map(group::serviceName, Service)). */privatefinal Map serviceMap =newConcurrentHashMap();...省略部分代码...//注册服务实例publicvoidregisterInstance(String namespaceId, String serviceName, Instance instance)throws NacosException { //1.会尝试从serviceMap(服务注册表)中获取到服务实例,如果没有就会创建一个Service,// 并设置好属性:GroupName,namespaceId,serviceName。然后存储到ServiceManager的一个ConcurrentHashMap中// 服务注册表的结构是MapcreateEmptyService(namespaceId, serviceName, instance.isEphemeral());//从注册表中获取服务,注册表是一个Map结构,// 先根据namespaceId取得到Map,然后再根据serviceName取Service Service service =getService(namespaceId, serviceName);//参数无效,没有找到服务if(service == null){ thrownewNacosException(NacosException.INVALID_PARAM,"service not found, namespace: "+ namespaceId +", service: "+ serviceName);}//添加 instance 服务实例到注册表addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);}...省略部分代码...//二.创建service,并初始化publicvoidcreateServiceIfAbsent(String namespaceId, String serviceName,boolean local, Cluster cluster)throws NacosException { Service service =getService(namespaceId, serviceName);//如果服务不存在就创建一个serviceif(service == null){ Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service =newService(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName));// now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum();if(cluster != null){ cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster);} service.validate();//保存service和初始化serviceputServiceAndInit(service);if(!local){ addOrReplaceService(service);}}}//保存service和初始化serviceprivatevoidputServiceAndInit(Service service)throws NacosException { //保存serviceputService(service); service =getService(service.getNamespaceId(), service.getName());//初始化service service.init();//consistencyService.listen实现数据一致性监听 consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(),true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(),false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());}//保存service到注册表中publicvoidputService(Service service){ if(!serviceMap.containsKey(service.getNamespaceId())){ synchronized(putServiceLock){ if(!serviceMap.containsKey(service.getNamespaceId())){ serviceMap.put(service.getNamespaceId(),newConcurrentSkipListMap());}}}//把注册的服务存储到Map中 serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);}

registerInstance做了三个事情

通过putService()方法将服务缓存到内存

service.init()建立心跳机制

consistencyService.listen实现数据一致性监听

registerInstance方法会尝试从ServiceManager#serviceMap(服务注册表)中获取到服务实例,如果没有就会创建一个Service,并设置好属性:GroupName,namespaceId,serviceName。然后存储到ServiceManager#serviceMap中。

该Map是一个ConcurrentHashMap,结构是Map>。第一个Key是NamespaceId 如:public ,第二个key是服务名,如 : DEFAULT_GROUP@@nacos-client

在这里插入图片描述

这就是nacos中的的服务注册表,用来存放注册的服务实例的Map.

在这里插入图片描述注意:service和instance的关系是,一个service中包含一个 Map , 一个Cluster中包含一个 Set。

service代表一个服务:比如用户服务Cluster代表服务集群,比如2个用户服务形成一个集群而一个集群中有多个服务实例,所以Cluster中有了Set 来保存服务实例

除此之外还会调用 com.alibaba.nacos.naming.core.Service#init 方法对service进行初始化,下面是init方法的源码

publicvoidinit(){ //clientBeatCheckTask 是一个Runnable,它持有service,它的作用是//检查并更新临时实例的状态,如果它们已过期,则将其删除 HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for(Map.Entry entry : clusterMap.entrySet()){ entry.getValue().setService(this); entry.getValue().init();}}//定时任务:定时检查服务的健康状况,5S一次publicstaticvoidscheduleCheck(ClientBeatCheckTask task){ futureMap.computeIfAbsent(task.taskKey(), k -> GlobalExecutor.scheduleNamingHealth(task,5000,5000, TimeUnit.MILLISECONDS));}

service.init 初始化方法中主要是把service封装到 ClientBeatCheckTask 对象中,ClientBeatCheckTask 是一个Runnable线程对象,然后使用定时任务5s执行一次健康检查。 ClientBeatCheckTask 的作用是 : 检查并更新临时实例的状态,如果它们已过期,则将其删除

下面是 com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask#run 线程对象的源码

publicvoidrun(){ try{ if(!getDistroMapper().responsible(service.getName())){ return;}if(!getSwitchDomain().isHealthCheckEnabled()){ return;}//拿到服务中的所有实例 List instances = service.allIPs(true);// first set health status of instances:for(Instance instance : instances){ //当前系统时间 - 实例最后心跳时间 > 默认15s,就意味着超时if(System.currentTimeMillis()- instance.getLastBeat()> instance.getInstanceHeartBeatTimeOut()){ if(!instance.isMarked()){ if(instance.isHealthy()){ //健康状态设置为false instance.setHealthy(false); Loggers.EVT_LOG .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}", instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());//发布时间:服务状态改变getPushService().serviceChanged(service);//发布时间:服务实例心跳超时事件 ApplicationUtils.publishEvent(newInstanceHeartbeatTimeoutEvent(this, instance));}}}}if(!getGlobalConfig().isExpireInstance()){ return;}// then remove obsolete instances:for(Instance instance : instances){ if(instance.isMarked()){ continue;}if(System.currentTimeMillis()- instance.getLastBeat()> instance.getIpDeleteTimeout()){ // delete instance Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JacksonUtils.toJson(instance));deleteIp(instance);}}}catch(Exception e){ Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}

run方法中会拿到当前service的所有instance,然后循环 , 如果:当前系统时间 - 实例最后心跳时间 > 默认15s,就意味着超时,然后会改变instance的Healthy健康状态Wie false; 并抛出 服务实例心跳超时事件

getPushService().serviceChanged(service):方法很有意思,他的作用是通知 nacos-client该服务已经下线(UDP协议 push),这样的话nacos-client就会从本地剔除掉下线的服务。这就是它和eureka不一样的地方,eureka使用的是pull.而 nacos采用pull + push模式。 具体源码见: PushService#onApplicationEvent

publicvoidonApplicationEvent(ServiceChangeEvent event){ Service service = event.getService(); String serviceName = service.getName(); String namespaceId = service.getNamespaceId();//使用定时任务 1s 一次 Future future = GlobalExecutor.scheduleUdpSender(()->{ try{ //服务改变,添加到 push队列 Loggers.PUSH.info(serviceName +" is changed, add it to push queue."); ConcurrentMap clients = clientMap .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));if(MapUtils.isEmpty(clients)){ return;} Map cache =newHashMap(16);long lastRefTime = System.nanoTime();for(PushClient client : clients.values()){ if(client.zombie()){ Loggers.PUSH.debug("client is zombie: "+ client.toString()); clients.remove(client.toString()); Loggers.PUSH.debug("client is zombie: "+ client.toString());continue;} Receiver.AckEntry ackEntry; Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString()); String key =getPushCacheKey(serviceName, client.getIp(), client.getAgent());byte[] compressData = null; Map data = null;if(switchDomain.getDefaultPushCacheMillis()>=20000&& cache.containsKey(key)){ org.javatuples.Pair pair =(org.javatuples.Pair) cache.get(key); compressData =(byte[])(pair.getValue0()); data =(Map) pair.getValue1(); Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());}if(compressData != null){ ackEntry =prepareAckEntry(client, compressData, data, lastRefTime);}else{ ackEntry =prepareAckEntry(client,prepareHostsData(client), lastRefTime);if(ackEntry != null){ cache.put(key,neworg.javatuples.Pair(ackEntry.origin.getData(), ackEntry.data));}} Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", client.getServiceName(), client.getAddrStr(), client.getAgent(),(ackEntry == null ? null : ackEntry.key));//使用udp协议pushudpPush(ackEntry);}}catch(Exception e){ Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);}finally{ futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));}},1000, TimeUnit.MILLISECONDS); futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}添加instance

到这里,service的缓存和初始化就看完了,代码回到 com.alibaba.nacos.naming.core.ServiceManager#registerInstance 。接下来就是分析 addInstance方法

//添加一个instance到Add instance to service.publicvoidaddInstance(String namespaceId, String serviceName,boolean ephemeral, Instance... ips)throws NacosException { //拿到key: com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@nacos-client String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);//拿到service Service service =getService(namespaceId, serviceName);//对service加同步锁,避免并发修改synchronized(service){ //拿到该service中的所有instance List instanceList =addIpAddresses(service, ephemeral, ips);//把实例列表封装到Instances 对象中 Instances instances =newInstances(); instances.setInstanceList(instanceList);//调用consistencyService.put()方法完成Nacos集群的数据同步,保证集群一致性 consistencyService.put(key, instances);}}

addInstance方法中会拿到service中的List实例列表,然后设置到 Instances 中,调用 consistencyService去同步到nacos集群。

这里采用了CopyOnWrite方案。对于 addIPAddress方法会拷贝旧的实例列表添加到新实例到列表中。在同步完nacos集群后,完成对实例状态更新后,则会用新列表直接覆盖旧实例列表。而在更新过程中,旧实例列表不受影响,用户依然可以读取。

这样在更新列表状态过程中,无需阻塞用户的读操作,也不会导致用户读取到脏数据,性能比较好。这种方案称为CopyOnWrite方案

consistencyService是用作service同步的。代表集群一致性的接口。

在这里插入图片描述

下面看一下 consistencyService.put 方法,底层会调用 DistroConsistencyServiceImpl#put 方法,源码如下

@Overridepublicvoidput(String key, Record value)throws NacosException { //根据key确定是用ephemeralConsistencyService或者persistentConsistencyServicemapConsistencyService(key).put(key, value);}private ConsistencyService mapConsistencyService(String key){ //key以 ephemeral 开头就是临时实例// 临时实例选择 ephemeralConsistencyService,也就是 DistroConsistencyServiceImpl类// 持久实例选择 persistentConsistencyService,也就是PersistentConsistencyServiceDelegateImplreturn KeyBuilder.matchEphemeralKey(key)? ephemeralConsistencyService : persistentConsistencyService;}//初始化方法,@PostConstructpublicvoidinit(){ //把notifier提交给线程池 GlobalExecutor.submitDistroNotifyTask(notifier);}@Overridepublicvoidput(String key, Record value)throws NacosException { //把实例保存到本地实例表onPut(key, value);//使用distro协议同步到集群 distroProtocol.sync(newDistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod()/2);}

put方法中,会先先根据服务的key判断使用临时同步服务ephemeralConsistencyService ,或者持久同步服务persistentConsistencyService。然后会做2个事情

调用onPut :把实例保存到本地实例列表 。调用distroProtocol.sync把实例同步到集群更新服务列表 对于onPut 方法中做了2个事情.一个是把实例封装到Datum对象中,然后交给dataStore存储起来。

另一个是通过notifier.addTask 把key放入阻塞队列,然后会通过线程池异步去执行阻塞队列```javapublic void onPut(String key, Record value) {

//判断是否是临时实例 if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum datum = new Datum(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); //把数据存储到dataStore,内部维护了一个Map dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } //这里是把key放入一个阻塞队列,然后会用线程池异步去执行队列 notifier.addTask(key, DataOperation.CHANGE);

}

public class Notifier implements Runnable {

private ConcurrentHashMap services = new ConcurrentHashMap(10 * 1024); //一个阻塞队列 private BlockingQueue tasks = new ArrayBlockingQueue(1024 * 1024); public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { //如果是change,就把key放入一个map中 services.put(datumKey, StringUtils.EMPTY); } //加入阻塞队列 tasks.offer(Pair.with(datumKey, action)); } @Override public void run() { Loggers.DISTRO.info("distro notifier started"); for (; ; ) { try { //从阻塞队列中取出任务 Pair pair = tasks.take(); //处理任务更新服务列表 handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } } Notifier是一个Runnable,其中维护了一个tasks(ArrayBlockingQueue)用来存储服务列表的变更事件。他的run方法中是一个死循环,不停的从阻塞队列中取出任务交给handle方法去处理。下面是 DistroConsistencyServiceImpl.Notifier#handle方法 ```java private void handle(Pair pair) { try { String datumKey = pair.getValue0(); DataOperation action = pair.getValue1(); services.remove(datumKey); int count = 0; ConcurrentLinkedQueue recordListeners = listeners.get(datumKey); if (recordListeners == null) { Loggers.DISTRO.info("[DISTRO-WARN] RecordListener not found, key: {}", datumKey); return; } //拿到有change的service,RecordListener 就是 service的接口 for (RecordListener listener : recordListeners) { count++; try { //如果是change事件 if (action == DataOperation.CHANGE) { //取出服务 Datum datum = dataStore.get(datumKey); if (datum != null) { //执行linster的change事件。更新服务列表 listener.onChange(datumKey, datum.value); } else { Loggers.DISTRO.info("[DISTRO-WARN] data not found, key: {}", datumKey); } continue; } //处理服务的delete事件 if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } }

handle方法中会找到有变化的RecordListener,其实就是service(change 或者 delete事件)然后,触发onChange方法,其实就是调用 com.alibaba.nacos.naming.core.Service#onChange方法。

publicvoidonChange(String key, Instances value)throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);//遍历service中的所有实例instancefor(Instance instance : value.getInstanceList()){ if(instance == null){ // Reject this abnormal instance list:thrownewRuntimeException("got null instance "+ key);}if(instance.getWeight()>10000.0D){ //设置权重 instance.setWeight(10000.0D);}if(instance.getWeight()0.0D){ instance.setWeight(0.01D);}}//修改IPupdateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));recalculateChecksum();}

该方法中会调用updateIPS去更新服务实例,源码如下

publicvoidupdateIPs(Collection instances,boolean ephemeral){ // 准备一个HashMap,key是cluster,值是集群下的Instance集合 Map ipMap =newHashMap(clusterMap.size());// 获取集群名称存储到map中,key是集群名for(String clusterName : clusterMap.keySet()){ ipMap.put(clusterName,newArrayList());}// 遍历要更新的实例for(Instance instance : instances){ try{ if(instance == null){ Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");continue;}// 判断实例是否包含clusterName,没有的话用默认clusterif(StringUtils.isEmpty(instance.getClusterName())){ instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);}// 判断cluster是否存在,不存在则创建新的clusterif(!clusterMap.containsKey(instance.getClusterName())){ Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); Cluster cluster =newCluster(instance.getClusterName(),this); cluster.init();getClusterMap().put(instance.getClusterName(), cluster);}// 获取当前cluster实例的集合,不存在则创建新的 List clusterIPs = ipMap.get(instance.getClusterName());if(clusterIPs == null){ clusterIPs =newLinkedList(); ipMap.put(instance.getClusterName(), clusterIPs);}// 添加新的实例到 Instance 集合 clusterIPs.add(instance);}catch(Exception e){ Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: "+ instance, e);}}for(Map.Entry entry : ipMap.entrySet()){ //make every ip mine List entryIPs = entry.getValue();// 这里就是在更新注册表 clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);}//设置最后修改时间setLastModifiedMillis(System.currentTimeMillis());// 发布服务变更的通知消息getPushService().serviceChanged(this); StringBuilder stringBuilder =newStringBuilder();for(Instance instance :allIPs()){ stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");} Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}",getNamespaceId(),getName(), stringBuilder.toString());}

上面代码中 ,clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); 就是 在更新服务注册表,因为service#clusterMap 是一个Map 结构,cluster中就是服务实例。然后会调用 .Cluster#updateIps去更新实例。源码如下

publicvoidupdateIps(List ips,boolean ephemeral){ Set toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;//拿到旧的服务列表, HashMap oldIpMap =newHashMap(toUpdateInstances.size());for(Instance ip : toUpdateInstances){ oldIpMap.put(ip.getDatumKey(), ip);}...省略部分代码...// 检查新加入实例的状态 List newIPs =subtract(ips, oldIpMap.values());if(newIPs.size()>0){ Loggers.EVT_LOG .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}",getService().getName(),getName(), newIPs.size(), newIPs.toString());for(Instance ip : newIPs){ //重置服务的健康状态 HealthCheckStatus.reset(ip);}}// 移除要删除的实例 List deadIPs =subtract(oldIpMap.values(), ips);if(deadIPs.size()>0){ Loggers.EVT_LOG .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}",getService().getName(),getName(), deadIPs.size(), deadIPs.toString());for(Instance ip : deadIPs){ //移除 HealthCheckStatus.remv(ip);}} toUpdateInstances =newHashSet(ips);if(ephemeral){ // 直接覆盖旧实例列表 ephemeralInstances = toUpdateInstances;}else{ persistentInstances = toUpdateInstances;}}同步服务到集群

接下来回到 DistroConsistencyServiceImpl#put方法中。刚才说到该方法做了2个事情

onPut(key, value) : 更新服务列表distroProtocol.sync :同步服务到集群

我们现在来看一下sync方法是怎么做的,下面是方法的源码

/** * 开始同步数据到所有的远程服务 * Start to sync data to all remote server. * * @param distroKey distro key of sync data * @param action the action of data operation */publicvoidsync(DistroKey distroKey, DataOperation action,long delay){ //拿到除开自己以外的所有nacos集群中的成员for(Member each : memberManager.allMembersWithoutSelf()){ //构建一个key DistroKey distroKeyWithTarget =newDistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress());//构建一个延迟任务对象 DistroDelayTask distroDelayTask =newDistroDelayTask(distroKeyWithTarget, action, delay);//交给线程池去执行,维护了一个DistroDelayTaskExecuteEngine//任务交给 NacosDelayTaskExecuteEngine 引擎 其中维护了一个ScheduledExecutorService线程池 distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if(Loggers.DISTRO.isDebugEnabled()){ Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());}}}

该方法中会找到所有的nacos集群的成员(除开自己),然后会拿到服务的key(DistroKey )构建一个DistroDelayTask任务对象,交给线程池去执行同步。

这里维护了一个 DelayTaskExecuteEngine 延迟任务执行引擎NacosDelayTaskExecuteEngine,任务的执行通过引擎的 processTasks方法完成com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#processTasks

protectedvoidprocessTasks(){ //拿到所有任务 Collection keys =getAllTaskKeys();for(Object taskKey : keys){ AbstractDelayTask task =removeTask(taskKey);if(null == task){ continue;}//任务执行器 NacosTaskProcessor processor =getProcessor(taskKey);if(null == processor){ getEngineLog().error("processor not found for task, so discarded. "+ task);continue;}try{ // ReAdd task if process failed//执行任务,任务失败会重试if(!processor.process(task)){ retryFailedTask(taskKey, task);}}catch(Throwable e){ getEngineLog().error("Nacos task execute error : "+ e.toString(), e);//重试失败的任务retryFailedTask(taskKey, task);}}}总结

文章有点长,下面做个总结,从大的流程上来说分为如下几个步骤

instanceController接口: nacos服务点接受到注册请求后会把请求解析为Instance,紧接着会执行serviceManager#registerInstance方法注册实例serviceManager#registerInstance方法中会先尝试创建Service对象,并缓存到一个Map> 结构的服务注册表中,然后对每个service做初始化,主要是使用线程池10s一次检查服务是否健康状态,过期的服务会删除掉。serviceManager#registerInstance第二个事情就是执行addInstances方法添加实例,该方法会触发服务列表的更新以及把服务同步到其他nacos集群中。

在这里插入图片描述文章到这里就结束了,如果文章对你有所帮助,请给个好评,你的鼓励是我最大的动力



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3